RocketMQ Connect 実践編 5
Elasticsearch Source -> RocketMQ Connect -> Elasticsearch Sink
準備作業
RocketMQの起動
- Linux/Unix/Mac
- 64ビット JDK 1.8以上;
- Maven 3.2.x以上;
- RocketMQを起動します。RocketMQ 4.x または RocketMQ 5.x のいずれかのバージョンを使用できます。
- ツールを使用してRocketMQのメッセージ送受信をテストします。
ここでは、環境変数NAMESRV_ADDRを使用して、ツールクライアントにRocketMQのNameServerアドレス(localhost:9876)を通知します。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
**注意**: RocketMQにはトピックとグループを自動的に作成する機能があります。メッセージを送信または購読する際に、対応するトピックまたはグループが存在しない場合、RocketMQは自動的にそれらを作成します。そのため、トピックとグループを事前に作成する必要はありません。
これはコンテンツの日本語訳です
コネクタランタイムの構築
リポジトリをクローンし、RocketMQ Connectプロジェクトをビルドします
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
Elasticsearchコネクタプラグインのビルド
Elasticsearch RocketMQコネクタプラグインをビルドします
cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/
mvn clean package -Dmaven.test.skip=true
コンパイルされたElasticsearch RocketMQコネクタプラグインのJARファイルを、ランタイムが使用するプラグインディレクトリにコピーします
mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins
スタンドアロンモードでコネクタワーカーを実行する
connect-standalone.conf ファイルを変更して、RocketMQの接続アドレスなどの情報を設定します。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
設定情報の例は以下のとおりです
workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678
clusterName="DefaultCluster"
# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins
スタンドアロンモードでは、RocketMQ Connectは同期チェックポイント情報をstorePathRootDirで指定されたローカルファイルディレクトリに永続的に保存します。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
同期チェックポイントをリセットする場合は、永続化ファイルを削除します
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
コネクタワーカーをスタンドアロンモードで起動するには
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
Elasticsearchサービスのセットアップ
Elasticsearchはオープンソースの検索および分析エンジンです。
ソースデータベースとデスティネーションデータベースとして機能させるために、Elasticsearchの2つの別々のDockerインスタンスを使用します
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1
docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \  
-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \  
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1 
docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \ 
-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \ 
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1
Dockerコマンドの説明
- --name es2: コンテナの名前を指定します(例:- es2)。
- -p 9201:9200 -p 9301:9300: Elasticsearchコンテナのポート9200と9300をホストポート9201と9301にマッピングし、ホスト経由でElasticsearchサービスにアクセスできるようにします。
- -e discovery.type=single-node: Elasticsearchがクラスタ内の他のノードを検出せずに単一ノードで動作するように設定します。単一サーバーのデプロイに適しています。
- -v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data: ホスト上のディレクトリをコンテナ内の- /usr/share/elasticsearch/dataにマウントして、Elasticsearchデータの永続ストレージを提供します。
これは、ホストマシンのポート9200を介してアクセス可能なコンテナに永続データストレージを持つ、カスタム設定されたElasticsearchインスタンスを実行します。ローカルマシンでの開発またはテスト環境に役立ちます。
Elasticsearchログを表示する
docker logs -f es1
docker logs -f es2
Elasticsearchが正常に起動したことを確認します
# Check Elasticsearch instance 1
curl -XGET https://:9200
# Check Elasticsearch instance 2
curl -XGET https://:9201
接続と操作が正しければ、Elasticsearchとそのバージョン番号に関する情報を含むJSONレスポンスが返されます。
Kibanaサービスのセットアップ
Kibanaは、Elasticsearchクラスタに格納されているデータをインタラクティブに探索し、理解できるオープンソースのデータ可視化ツールです。チャート、グラフ、ダッシュボードなどの豊富な機能を提供します。
便宜上、DockerでKibanaの2つの別々のインスタンスをセットアップし、以下のコマンドを使用して、以前に確立したElasticsearchコンテナにリンクします
docker pull docker.elastic.co/kibana/kibana:7.15.1
docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1
docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1
Dockerコマンドの説明
- --name kibana2: 新しいコンテナに名前を割り当てます(例:kibana2)。
- --link es2:elasticsearch: コンテナを別の名前付きElasticsearchインスタンス(この場合は「es2」)にリンクします。これにより、KibanaとElasticsearch間の通信が可能になります。
- -p 5602:5601: Kibanaのデフォルトポート(5601)をホストマシンの同じポートにマッピングして、ブラウザからアクセスできるようにします。
- -d: Dockerコンテナをデタッチモードで実行します。
コンテナが起動したら、そのログ出力を監視できます
docker logs -f kibana1
docker logs -f kibana2
Kibanaコンソールページにアクセスするには、ブラウザで次のアドレスにアクセスしてください
- kibana1: https://:5601
- kibana2:https://:5602
正しくロードされた場合は、それぞれのKibanaインスタンスが正常に起動したことを示します。
ソースElasticsearchにテストデータを書き込む
KibanaのDev Toolsを使用すると、KibanaでElasticsearchを直接操作できます。さまざまなクエリや操作を実行し、返されたデータを分析して理解できます。ドキュメントconsole-kibanaを参照してください。
テストデータの一括書き込み
ブラウザからKibana1コンソールにアクセスし、左側のメニューからDev Toolsを見つけて、ページに次のコマンドを入力してテストデータを書き込みます
POST /_bulk
{ "index" : { "_index" : "connect_es" } }
{ "id": "1", "field1": "value1", "field2": "value2" }
{ "index" : { "_index" : "connect_es" } }
{ "id": "2", "field1": "value3", "field2": "value4" }
注意:
- connect_es: データのインデックス名。
- id/field1/field2: これらはフィールド名であり、1、value1、value2はフィールドの値を表します。
**注意**: rocketmq-connect-elasticsearchには制限があり、>=比較演算(文字列または数値)に使用できるフィールドがデータに必要です。このフィールドは同期チェックポイントの記録に使用されます。上記の例では、idフィールドはグローバルに一意の、インクリメントする数値フィールドです。
データのクエリ
インデックス内のデータをクエリするには、次のコマンドを使用します
GET /connect_es/_search
{
  "size": 100
}
データがない場合、レスポンスは次のようになります
{
  "error" : {
    ... 
    "type" : "index_not_found_exception",
    "reason" : "no such index [connect_es]",
    "resource.type" : "index_or_alias",
    "resource.id" : "connect_es",
    "index_uuid" : "_na_",
    "index" : "connect_es"
  },
  "status" : 404
}
データがある場合、レスポンスは次のようになります
{
  ...
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "connect_es",
        "_type" : "_doc",
        "_id" : "_dx49osBb46Z9cN4hYCg",
        "_score" : 1.0,
        "_source" : {
          "id" : "1",
          "field1" : "value1",
          "field2" : "value2"
        }
      },
      {
        "_index" : "connect_es",
        "_type" : "_doc",
        "_id" : "_tx49osBb46Z9cN4hYCg",
        "_score" : 1.0,
        "_source" : {
          "id" : "2",
          "field1" : "value3",
          "field2" : "value4"
        }
      }
    ]
  }
}
データの削除
テストの繰り返しなどの理由でインデックス内のデータを削除する必要がある場合は、次のコマンドを使用します
DELETE /connect_es
コネクタの起動
Elasticsearchソースコネクタの起動
次のコマンドを実行して、ESソースコネクタを起動します。コネクタはElasticsearchに接続し、connect_esインデックスからドキュメントデータを読み取ります。Elasticsearchドキュメントデータを解析し、汎用ConnectRecordオブジェクトにパッケージ化して、Sinkコネクタが消費するRocketMQトピックに送信します。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d  '{
  "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
    "elasticsearchHost":"localhost",
    "elasticsearchPort":9200,
    "index":{
        "connect_es": {
            "primaryShards":1,
            "id":1
        }
    },
    "max.tasks":2,
    "connect.topicname":"ConnectEsTopic",
    "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
    "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
**注意**: 起動コマンドは、ソースESがconnect_esインデックスを同期すること、およびインデックスのインクリメントフィールドがidであることを指定します。データはid = 1から取得されます。
curlリクエストがstatus:200を返した場合、作成が成功したことを示し、サンプルレスポンスは次のようになります
{"status":200,"body":{"connector.class":"...
次のログが表示された場合、ファイルソースコネクタが正常に起動したことを示します。
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
コネクタelasticsearchSourceConnectorを起動し、ターゲット状態STARTEDを正常に設定しました!!
Elasticsearchシンクコネクタの起動
次のコマンドを実行して、ESシンクコネクタを起動します。コネクタはRocketMQトピックからデータを購読して消費します。各メッセージをドキュメントデータに変換し、デスティネーションESに書き込みます。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{
  "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector",
    "elasticsearchHost":"localhost",
    "elasticsearchPort":9201,
    "max.tasks":2,
    "connect.topicnames":"ConnectEsTopic",
    "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
    "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
**注意**: 起動コマンドは、デスティネーションESのアドレスとポートを指定します。これは、Dockerで以前に起動したES2に対応します。
curlリクエストがstatus:200を返した場合、作成が成功したことを示し、サンプルレスポンスは次のようになります
{"status":200,"body":{"connector.class":"...
次のログが表示された場合、ファイルソースコネクタが正常に起動したことを示します
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
コネクタelasticsearchSinkConnectorを起動し、ターゲット状態STARTEDを正常に設定しました!!
シンクコネクタがデスティネーションESインデックスにデータを書き込んだかどうかを確認するには
- ブラウザでKibana2コンソールアドレス(https://:5602)にアクセスします
- Kibana2 Dev Toolsページで、インデックス内のデータをクエリします。ソースES1のデータと一致する場合、コネクタは正常に動作しています。
GET /connect_es/_search
{
  "size": 100
}